package com.cloudansys.core.flink.function;

import com.cloudansys.core.constant.Const;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@Slf4j
public class PrintSpeedFunc extends ProcessFunction<String, String> {

    private Long lastTime = 0L;
    private List<String> typeIds = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        lastTime = System.currentTimeMillis();
        typeIds = new ArrayList<>();
    }

    /**
     * 打印数据频率
     */
    @Override
    public void processElement(String payload, Context context, Collector<String> out) {
        log.info("payload: {}", payload);
        try {
            String typeId = payload.split(Const.PTN_COMMA)[3];
            typeIds.add(typeId);
            long thisTime = System.currentTimeMillis();
            long diffTime = thisTime - lastTime;
            if (diffTime > 1000) {
                Collections.sort(typeIds);
                log.info("size: {},diffTime: {},typeIds: {}", typeIds.size(), diffTime, typeIds);
                lastTime = System.currentTimeMillis();
                typeIds.clear();
            }
        } catch (Exception e) {
            log.error("print speed error");
            e.printStackTrace();
        }
        out.collect(payload);
    }

}
